package org.luosl.webmagicx.pipeline

import java.io._
import java.util.concurrent.locks.{Lock, ReentrantLock}
import java.util

import org.luosl.webmagicx.conf.{ConfException, XmlProps, SpiderConf}
import us.codecraft.webmagic.{ResultItems, Task}
import org.apache.commons.csv.{CSVFormat, CSVParser, CSVPrinter}
import org.luosl.webmagicx.pipeline.component.{Distinct, HashSetDistinct}
import org.luosl.webmagicx.conf.PropType._
import org.luosl.webmagicx.conf.MatcherConverters._

import scala.collection.JavaConverters._


/**
  * Created by luosl on 2017/12/6.
  */
class CSVPipeline(sc:SpiderConf, task:Task, props:XmlProps) extends AbstractPipeline(sc, task, props){

  private val file:File = new File(props.value("path", "value")(strType))

  private val charset:String = props.valueOrDefault("charset", "value")(strType)("utf-8")

  val (fields,header) = {
    val needSaveFields:String = props.valueOrDefault("needSaveFields", "value")(strType)("*")
    val fieldsAndHeaders:(List[String], List[String]) = if(needSaveFields == "*"){
      (allFields, allFields)
    }else{
      needSaveFields.split(",|，").foldLeft((List.empty[String],List.empty[String])){ (tu, item)=>
        val fieldAndDbColumn:(String, String) = item.split(":") match {
          case Array(fieldName) => (fieldName, fieldName)
          case Array(field,dbColumnName) => (field,dbColumnName)
          case _ =>
            val errorMsg:String = s"无效的needSaveFields表达式:[needSaveFields=${needSaveFields.mkString(",")}]"
            throw new RuntimeException(errorMsg)
        }
        (tu._1 ::: List(fieldAndDbColumn._1), tu._2 ::: List(fieldAndDbColumn._2))
      }
    }
    (fieldsAndHeaders._1, fieldsAndHeaders._2)
  }

  private val format:CSVFormat = CSVFormat.DEFAULT.withHeader(header:_*).withSkipHeaderRecord()

  private val isOverride:Boolean = props.valueOption("model", "value")(strType).contains("override")

  /**
    * 是否需要构建表头
    */
  private val needBuildHeader:Boolean = !file.exists() || isOverride

  private val distinctOptVal:Option[Distinct] = {
    val distinctFieldOpt:Option[String] = props.valueOption("distinct", "field")(strType)
    val distinctCsvHeaderOpt:Option[String] = props.valueOption("distinct", "csvHeader")(strType)
    (distinctFieldOpt, distinctCsvHeaderOpt) match {
      case (Some(field), Some(csvHeader)) =>
        val loadCacheOp:util.HashSet[Any]=>Unit = (set:util.HashSet[Any])=> {
          if(file.exists() && !isOverride){
            val reader = new InputStreamReader(new FileInputStream(file),charset)
            try {
              val itr:CSVParser = format.parse(reader)
              itr.iterator().asScala.foreach{ rec=>
                val distValue:Any = rec.get(csvHeader)
                if(null == distValue) throw ConfException(s"无效的csvHeader:$csvHeader")
                set.add(rec.get(csvHeader))
              }
            }catch {
              case ie:IllegalArgumentException =>
                throw ConfException(s"无效的[csvHeader=$csvHeader]", ie)
              case e:Exception =>
                throw e
            }finally {
              reader.close()
            }
          }
        }
        val distValOp:ResultItems => Any = (ris:ResultItems) => ris.get(field.toString).asInstanceOf[Any]
        Option(new HashSetDistinct(loadCacheOp,distValOp))
      case (None, None) => None
      case _ => throw ConfException("distinct 必须同时配置 field , csvHeader 属性")
    }
  }

  val csvPrinter:CSVPrinter = {
    val append = props.valueOrDefault("model", "value")(strType)("append") match {
      case "append" => true
      case "override" => false
      case other:Any =>
        val msg:String = s"无效的Model:$other,CSVPipeline的[model]属性只支持[append,override]"
        throw new ConfException(msg)
    }
    val writer:OutputStreamWriter = new OutputStreamWriter(new FileOutputStream(file, append), charset)
    val printer = new CSVPrinter(writer, format)
    if(needBuildHeader) printer.printRecord(header:_*)
    printer
  }

  val lock:Lock = new ReentrantLock()


  /**
    * 获取 去重器
    *
    * @return
    */
  override def distinctOpt(): Option[Distinct] = distinctOptVal

  /**
    * 保存操作
    *
    * @param resultItems resultItems
    * @param task        task
    */
  override def save(resultItems: ResultItems, task: Task): Unit = {
    try {
      lock.lock()
      val item:Array[String] = fields.map(key=> resultItems.get[String](key)).toArray
      csvPrinter.printRecord(item:_*)
    } finally lock.unlock()
  }

  override def close(): Unit ={
    if(props.valueOrDefault("closeAtTheEnd","value")(booleanType)(true)) {
      csvPrinter.close()
    }else{
      csvPrinter.flush()
    }
  }

}
